In [2]:
import pandas as pd
import sys
sys.path.insert(0,'/global/project/projectdirs/metatlas/anaconda/lib/python2.7/site-packages' )
from molvs.standardize import enumerate_tautomers_smiles
from rdkit import Chem
import multiprocessing as mp
import time

ref_df = pd.read_pickle('/project/projectdirs/openmsi/projects/ben_run_pactolus/unique_compounds.pkl')
inchis = ref_df.head(100).inchi.tolist()

def process_apply(x):
    return enumerate_tautomers_smiles(Chem.MolToSmiles(Chem.MolFromInchi(str(x))))


num_procs = 100
p = mp.Pool(processes=num_procs)
t0 = time.time()

pool_results = p.map(process_apply, inchis)
print time.time() - t0
p.close()
p.terminate()
df = pd.DataFrame(pd.Series(pool_results),columns = ['tautomers'])
df.to_pickle('/project/projectdirs/openmsi/projects/chemical_networks/tautomers.pkl')
print time.time() - t0

In [8]:
df


Out[8]:
tautomers
0 {C=C(C)C(CC)CCC(C)C1CCC2(C)C3=C(CCC12C)C1(C)CC...
1 {CC1=C2Oc3c(C)ccc(=C(O)N=C4C(O)NC(C(C)C)C(=O)N...
2 {CC(C)=CCCC(C)C1CC(O)C2(C=O)C3=C(CCC12C)C1(C)C...
3 {O=C(C=C(O)C(O)C(O)=C(O)CO)C(O)O, O=C(CO)C(=O)...
4 {CC(CCCC(C)C1CCC2C3CC=C4CC(=O)CCC4(C)C3CCC12C)...
5 {CC(C)CCCC(C)C1CCC2C3=CCC4C(C)(C=O)C(O)CCC4(C)...
6 {CC(CCC1OC(n2cnc3c(=N)nc[nH]c32)C(O)C1O)C1CCC2...
7 {CCCCCCCCCCCC(=O)OCC(COP(=O)(O)OCC[N+](C)(C)C)...
8 {COC1C(O)C(COP(=O)(O)O[PH](=O)(=O)O[PH](=O)(=O...
9 {COC(=O)CCC(=O)NCC(=NC(O)=C(N)CCCCN)C(=O)O, CO...
10 {CC(=O)NC(=CCP(O)O)C(N)O, CC(O)=NC(CCP(O)O)=C(...
11 {C=C(CCC(C)C1CCC2(C)C3CCC4C(C)C(O)CCC45CC35CCC...
12 {NC(CCC(O)=NC(=CSS(=O)(=O)O)C(=O)NCC(O)O)=C(O)...
13 {CCC(C=CC(C)C1CCC2C3CC=C4CC(OC5OC(CO)C(O)C(O)C...
14 {CCCCCCC1CC1CCCCCCCC(=O)N(CCO[PH](=O)(=O)OCC(O...
15 {CC=C(C)CCC=C(C)CCC=C(C)CCC=C(C)CCC=C(C)CCC=C(...
16 {CC(C)C(C)C(O)C(O)C(C)C1CCC2C3CC(O)=C4CC(O)CCC...
17 {O=C(O)C(=O)CC=CC=C(O)O, O=C(O)C=CCCC(=O)C(=O)...
18 {O=C(O)C(=O)C=CC1C(=O)C=C(O)N=C1C(=O)O, O=C1C=...
19 {CC(C)CCCC(C)C1CCC2C3CC=C4CC(OC5OC(CO)C(O)C(O)...

In [ ]:
import pandas as pd
import sys
sys.path.insert(0,'/global/project/projectdirs/metatlas/anaconda/lib/python2.7/site-packages' )
from molvs.standardize import enumerate_tautomers_smiles
from rdkit import Chem

ref_df = pd.read_pickle('/project/projectdirs/openmsi/projects/ben_run_pactolus/unique_compounds.pkl')


import multiprocessing as mp

def process_apply(x):
    return enumerate_tautomers_smiles(Chem.MolToSmiles(Chem.MolFromInchi(str(x))))

def process(df):
    res = df.apply(process_apply)
    return res

import time

t0 = time.time()
num_procs = 272
p = mp.Pool(processes=num_procs)
split_dfs = np.array_split(ref_df.head(1000).inchi,num_procs)
pool_results = p.map(process, split_dfs)
p.close()
p.join()

# merging parts processed by different processes
parts = pd.concat(pool_results, axis=0)
parts.to_pickle('/project/projectdirs/openmsi/projects/chemical_networks/tautomers.pkl')
print time.time() - t0
# # merging newly calculated parts to big_df
# big_df = pd.concat([big_df, parts], axis=1)

# # checking if the dfs were merged correctly
# pdt.assert_series_equal(parts['id'], big_df['id'])

In [ ]:
# tautomers = ref_df.head(100).inchi.apply(lambda x: enumerate_tautomers_smiles(Chem.MolToSmiles(Chem.MolFromInchi(str(x)))))
# smiles = rdkit_mols.apply(lambda x: (x,isomericSmiles=True))
# tautomers = smiles.apply(enumerate_tautomers_smiles)
# tautomers.to_pickle('/project/projectdirs/openmsi/projects/chemical_networks/tautomer_pandas_series.pkl')

In [ ]:
pool.close()
pool.terminate()

In [ ]:
len(np.array_split(ref_df.head(120).inchi,12))

In [ ]:
p.close()
p.terminate()

In [ ]:
d = [(20,26.8),(10,29.2)]

down vote accepted You can parallelize this with Dask.dataframe. This will work almost the same except that you can't use column assignment and will instead need to use the assign method

>>> dmaster = dd.from_pandas(master, npartitions=4)
>>> dmaster = dmaster.assign(my_value=dmaster.original.apply(lambda x: helper(x, slave), name='my_value'))
>>> dmaster.compute()

original my_value

0 this is a nice sentence 2

1 this is another one 3

2 stackoverflow is nice 1

Additionally, you should think about the tradeoffs between using threads vs processes here. Your fuzzy string matching almost certainly doesn't release the GIL, so you won't get any benefit from using multiple threads. However, using processes will cause data to serialize and move around your machine, which might slow things down a bit.

You can experiment between using threads and processes or a distributed system by managing the get= keyword argument to the compute() method.

import dask.multiprocessing
import dask.threaded

>>> dmaster.compute(get=dask.threaded.get)  # this is default for dask.dataframe
>>> dmaster.compute(get=dask.multiprocessing.get)  # try processes instead

In [ ]:


In [ ]:
dmaster

In [ ]:
%%time
import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
dmaster = dd.from_pandas(ref_df.head(120), npartitions=20)
dmaster = dmaster.assign(my_value=dmaster.inchi.apply(lambda x: enumerate_tautomers_smiles(Chem.MolToSmiles(Chem.MolFromInchi(str(x))))), name='my_value')
out_df = dmaster.compute(get=dask.threaded.get)

In [ ]:
mols = []
fmt = '%(asctime)s - %(levelname)s - %(validation)s - %(message)s'
validator = Validator(log_format=fmt)

from molvs import Standardizer
s = Standardizer()

smols = []
for m in enumerate_tautomers_smiles('C[C@H]1CCC[C@@]2([C@@]1(CCCC2)O)C'):
    mol = Chem.MolFromSmiles(m)
    mols.append(mol)
    validator.validate(mol)
    smols.append(s.tautomer_parent(mol))

In [ ]:
MolsToGridImage(mols, molsPerRow=4, subImgSize=(300, 200), legends=None, highlightAtomLists=None, useSVG=False)#, **kwargs)